


#include <unistd.h>
#include <stdlib.h>
#include "qelib.h"
#include "token_bucket.h"
#include "traffic_metric.h"



#define TC_LOGNAME          "tc"
#define tc_debug(...)       qelog_debug(TC_LOGNAME,   __VA_ARGS__)
#define tc_info(...)        qelog_info(TC_LOGNAME,    __VA_ARGS__)
#define tc_notice(...)      qelog_notice(TC_LOGNAME,  __VA_ARGS__)
#define tc_warning(...)     qelog_warning(TC_LOGNAME, __VA_ARGS__)
#define tc_error(...)       qelog_error(TC_LOGNAME,   __VA_ARGS__)


#define PRIOR_QUEUE_SIZE    (2)


typedef struct
{
    int size;
    int prior;
} packet;

typedef struct
{
    qe_ringq *q;
    pthread_mutex_t lock;
    pthread_t thread;
    traffic_metric in_metric;
    traffic_metric out_metric;
} prior_queue;

typedef struct
{
    prior_queue *pqs;
    fix_speed_token *token;
} tc;

static tc t;

static void process_packet(packet *pkt, fix_speed_token *token, 
    traffic_metric *m1, traffic_metric *m2)
{
    double t1, t2;

    if (traffic_metric_update(m1, pkt->size)) {
        t1 = m1->throughput;
    }

    if (fix_speed_token_get(token, pkt->size)) {
        if (traffic_metric_update(m2, pkt->size)) {
            t2 = m2->throughput;
            tc_debug("t1 %.03f t2 %.03f", t1, t2);
        }
    }
}

static int random_int(int min, int max)
{
    static qe_bool init = qe_false;

    if (init == qe_false) {
        time_t now;
        srand(time(&now));
        init = qe_true;
    }

    return rand() % max + min;
}

static int tc_init(tc *t, int queue_size)
{
    t->pqs = qe_malloc(sizeof(prior_queue) * queue_size);
    return 0;
}

static void *queue_thread(void *data)
{
    int i, j;
    int priority;
    packet pkt;
    traffic_metric *itm, *otm;
    qe_ret ret;
    prior_queue *pq;
    qe_bool updated = qe_false;

    while (1) {
        updated = qe_false;
        // for (i=PRIOR_QUEUE_SIZE-1; i>0; i--) {
        //     tc_debug("[%d] wait %d", 
        //         i, qe_ringq_wait(t.pqs[i].q));
        // }

        for (i=PRIOR_QUEUE_SIZE-1; i>=0; i--) {
            priority = i;
            pq = &t.pqs[priority];
            itm = &pq->in_metric;
            otm = &pq->out_metric;

            if (qe_ringq_isempty(pq->q)) {
                continue;
            }

__repeat:
            pthread_mutex_lock(&pq->lock);
            ret = qe_ringq_deq(pq->q, &pkt, 1);
            pthread_mutex_unlock(&pq->lock);
            //tc_debug("queue[%d] %d deq", i, qe_ringq_wait(pq->q));
            if (ret == qe_ok) {
                if (fix_speed_token_get(t.token, pkt.size)) {
                    if (traffic_metric_update(otm, pkt.size)) {
                        tc_debug("queue[%d] tx %.4f tx %.4f", 
                            priority, itm->throughput, otm->throughput);
                        updated = qe_true;
                    }
                    if (!qe_ringq_isempty(pq->q))
                        goto __repeat;
                    else
                        goto __skip_loop;
                } else {
                    //tc_error("no token, drop");
                }
            }
        }

__skip_loop:
        if (updated)
            tc_debug("");
        usleep(50000);
    }
}

static void enqueue_packet(packet *pkt)
{
    int priority;
    prior_queue *pq;
    traffic_metric *tm;
    
    priority = pkt->prior;
    pq = &t.pqs[priority];
    tm = &pq->in_metric;

    //tc_debug("in metric period %d %u", tm->period_ms, tm->ts);

    if (traffic_metric_update(tm, pkt->size)) {
        //tc_debug("queue[%d] rx %.4f", priority, tm->throughput);
    }

    pthread_mutex_lock(&pq->lock);
    if (qe_ringq_isfull(pq->q) && priority == 1) {
        tc_debug("drop queue[%d] last pkt", priority);
    }
    qe_ringq_enq(pq->q, pkt, 1);
    //tc_debug("pkt %d %d enqueue", pkt->prior, pkt->size);
    pthread_mutex_unlock(&pq->lock);
}

int main(int argc, char *argv[])
{
    int i;
    packet pkt;
    fix_speed_token *tb;
    prior_queue *pq;

    qelog_init(QELOG_DEBUG, QELOG_DM | QELOG_DATE | QELOG_CL | QELOG_LV);

    tc_init(&t, PRIOR_QUEUE_SIZE);
    t.token = fix_speed_token_new(100000, 100000, 100000);

    tc_debug("tc init");

    for (i=0; i<PRIOR_QUEUE_SIZE; i++) {
        pq = &t.pqs[i];
        pq->q = qe_ringq_create(sizeof(packet), 128);
        pthread_mutex_init(&pq->lock, QE_NULL);
        pq->in_metric.throughput = 0;
        pq->in_metric.value = 0;
        pq->out_metric.throughput = 0;
        pq->out_metric.value = 0;
        traffic_metric_set_period(&pq->in_metric, 2000);
        traffic_metric_set_period(&pq->out_metric, 2000);
        tc_debug("queue[%d] queue init", i);
    }

    pthread_create(&pq->thread, QE_NULL, queue_thread, &i);

    tc_debug("queue init ok");

    for (i=0; i<50000; i++) {
        pkt.prior = random_int(0, PRIOR_QUEUE_SIZE);
        //pkt.size = random_int(64, 1500);
        pkt.size = 1000;
        //process_packet(&pkt, token, &tm1, &tm2);
        enqueue_packet(&pkt);
        usleep(5000);
    }

    return 0;
}